package xiaoa.java.spider.TeskHandle;

import java.net.URL;

import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.jsoup.nodes.Document;

import com.mongodb.client.model.Filters;

import xiaoa.java.jms.rabbitMq.ProducerQueue;
import xiaoa.java.log.L;
import xiaoa.java.mongoDB.DbMgr;
import xiaoa.java.spider.bean.EsMqHtmlBean;
import xiaoa.java.spider.bean.HtmlBean;
import xiaoa.java.spider.db.vo.FetchUrl;
import xiaoa.java.utils.ExceptionUtils;

/**
 * 处理器
 * @author xiaoa
 * @date 2017年12月9日 上午11:35:51
 * @version V1.0
 *
 * @param <B>
 */
public abstract class FetchHandlerBase<B extends FetchUrl> implements Runnable{
	
	// 入队
	private BlockingQueue<B>  inQueue  = null;
	
	protected static final String index  = "web";
	
	protected static final String type   = "html";
	
	// 速度
	private  AtomicInteger   speedCount = new AtomicInteger(0);
	
	// 发送队列
	private ProducerQueue  producerQueue  = null;
	
	/**
	 * 
	 * 构造器
	 * <p>Title: </p>
	 * <p>Description: </p>
	 * @author xiaoa
	 * @param inQueue
	 * @param nodeKey
	 * @throws Throwable
	 */
	public FetchHandlerBase(  BlockingQueue<B>  inQueue  , String nodeKey)throws Throwable {

		this.inQueue  = inQueue;
		
		// 初始化队列
		producerQueue = new ProducerQueue(nodeKey);
	}

	
	/**
	 * 获取一个任务
	 * @Title: getTask
	 * @return
	 * @author xiaoa
	 */
	public B getTask()throws Exception{
		return inQueue.take();
	}
	
	/**
	 * 速度
	 * @Title: speed
	 * @return
	 * @author xiaoa
	 */
	public  int speed(){
		
		int temp = speedCount.intValue();
		
		// 清空
		speedCount.set(0);
		
		return temp;
	}
	
	
	
	@Override
	public void run() {
		
		Thread.currentThread().setName("handler_" + Thread.currentThread().getName());
			
		boolean bu = true;	
			
		while (bu) {
			
			long startTime = System.currentTimeMillis();
			
			Throwable thro = null;
			// 获取一个task
			B  task = null;
			try {
	
				task = getTask();
				
				if (task == null){
					continue;
				}
				
				//  执行并保存
				doSaveEs(doHandle(task), new URL(task.getUrl()));
				
			} catch (Throwable e) {
				thro = e;
				e.printStackTrace();
			}finally {
				try {
					submitTaskFirst(task,startTime,System.currentTimeMillis(), thro);
				} catch (Throwable e) {
					e.printStackTrace();
				}
			}
			
	}
	
	L.info("============== 线程死亡 threadName = " + Thread.currentThread().getName() + " bu = " + bu);
		
	}
	
	
	/**
	 * 处理
	 * @Title: doHandle
	 * @param task
	 * @throws Throwable
	 * @author xiaoa
	 */
	protected	abstract Document doHandle(B task)throws Throwable;
	
	
	/**
	 * 提交任务
	 * @Title: submitTask
	 * @param task
	 * @param startTime
	 * @param endTime
	 * @param thro
	 * @throws Throwable
	 * @author xiaoa
	 */
	protected   void submitTask(B task , long startTime , long endTime, Throwable thro)throws Throwable{
		
		if (task == null){
			return ;
		}
		
		task.setUseTime((int)(endTime - startTime));
		task.setUpdateTime(new Date());
		if (thro == null){
			task.setState(B.STATE_SUCC);
			task.setMessage("处理成功");
		}else {
			task.setState(B.STATE_FAIL);
			task.setMessage(ExceptionUtils.exceptionToString(thro));
		}
		
		DbMgr.update(task, task.getId(), task.getClass());
		
	}
	
	
	/**
	 * 重置意外关闭的url
	 * @Title: resutTask
	 * @author xiaoa
	 */
	public long resutTask(Class<B> cla)throws Throwable{
		
		FetchUrl url = new FetchUrl();
		url.setState(FetchUrl.STATE_WAIT);
		
		long total =  DbMgr.updateMany(url, Filters.eq("state", FetchUrl.STATE_IN), cla);
		
		L.info("重置:" + total);
		
		return total;
	}
	
	
	 
	
	/**
	 * 添加到输出队列
	 * @Title: doSaveOutQueue
	 * @param doc
	 * @param task
	 * @throws Exception
	 * @author xiaoa
	 */
	protected void doSaveOutQueue(List<B> taskList)throws Throwable{
		
		long startTime = System.currentTimeMillis();
		
		// 过滤重复
		filter(taskList);
		
		// 设置属性
		for (B url :  taskList){
			
			url.setCreateTime(new Date());
			url.setState(B.STATE_WAIT);
		}
		
		// 如果有 插入
		if (taskList != null && !taskList.isEmpty()){
		
			DbMgr.insertMany(taskList, taskList.get(0).getClass());
			
			L.info(startTime,System.currentTimeMillis() ,"插入成功 count ：  " + taskList.size());
		}
		
	}
	
	/**
	 * 过滤
	 * @Title: filter
	 * @param list
	 * @throws Throwable
	 * @author xiaoa
	 */
	protected void filter(List<B> list )throws Throwable{}

	
	/**
	 * 提交任务前
	 * @Title: submitTaskFirst
	 * @param task
	 * @param startTime
	 * @param endTime
	 * @param thro
	 * @throws Throwable
	 * @author xiaoa
	 */
	protected  void submitTaskFirst(B task , long startTime , long endTime, Throwable thro)throws Throwable{
		submitTask(task, startTime, endTime, thro);
		// 增加速度
		speedCount.incrementAndGet();
		
	}

	
	/**
	 * 存入es
	 * @Title: doSaveEs
	 * @param html
	 * @param task
	 * @author xiaoa
	 */
	protected  void doSaveEs(Document document , URL url)throws Throwable{
		
		if (document == null){
			return ;
		}
		
		String title = document.title();
		String text  = document.text();
		
		HtmlBean  data = new HtmlBean();
		
		data.setHtml(document.html());
		data.setTitle(title);
		data.setText(text);
		data.setUrl(url.toString());
		data.setIndexTime(new Date(System.currentTimeMillis()));
		data.setId(System.nanoTime() +"");
		
		// es
		EsMqHtmlBean  bean = new EsMqHtmlBean();
		bean.setData(data);
		bean.setId(data.getId());
		
		// 发送
		producerQueue.doSendMessage(bean);

		
	}
	




}
